/*
 * Copyright 1999-2011 Alibaba Group.
 *
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

package com.alibaba.dubbo.rpc.protocol.dubbo;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.util.ReflectionUtils;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.serialize.ObjectInput;
import com.alibaba.dubbo.common.utils.Assert;
import com.alibaba.dubbo.common.utils.ReflectUtils;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.Codec;
import com.alibaba.dubbo.remoting.Decodeable;
import com.alibaba.dubbo.remoting.RemoteCalling;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.transport.CodecSupport;
import com.alibaba.dubbo.rpc.RpcInvocation;

import static com.alibaba.dubbo.rpc.protocol.dubbo.CallbackServiceCodec.decodeInvocationArgument;

/**
 * @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
 */
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable, RemoteCalling {

    private static final Logger log = LoggerFactory.getLogger(DecodeableRpcInvocation.class);

    private Channel     channel;

    private byte        serializationType;

    private InputStream inputStream;

    private Request     request;

    private volatile boolean hasDecoded;

    public DecodeableRpcInvocation(Channel channel, Request request, InputStream is, byte id) {
        Assert.notNull(channel, "channel == null");
        Assert.notNull(request, "request == null");
        Assert.notNull(is, "inputStream == null");
        this.channel = channel;
        this.request = request;
        this.inputStream = is;
        this.serializationType = id;
    }

    public void decode() throws Exception {
        if (!hasDecoded && channel != null && inputStream != null) {
            try {
                decode(channel, inputStream);
            } catch (Throwable e) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
                }
                request.setBroken(true);
                request.setData(e);
            } finally {
                hasDecoded = true;
            }
        }
    }

    public void encode(Channel channel, OutputStream output, Object message) throws IOException {
        throw new UnsupportedOperationException();
    }
    
    @Override
	public String target() {
		// TODO Auto-generated method stub
		return this.getAttachment(Constants.PATH_KEY);
	}

	@Override
	public String method() {
		return this.getMethodName();
	}

	@Override
	public Class<?>[] parameterTypes() {
		return this.getParameterTypes();
	}

	@Override
	public Object[] parameters() {
		return getArguments();
	}

    public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);

        setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
        setAttachment(Constants.PATH_KEY, in.readUTF());
        setAttachment(Constants.VERSION_KEY, in.readUTF());

        setMethodName(in.readUTF());
        Method method = null;
        try {
            Object[] args;
            Class<?>[] pts = null;
            int paramCount = in.readInt();
            
            if (paramCount  == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            }else if(paramCount < 0){
            	//aruan 2014.09.15 如果传入数据为-1
            	//说明客户端可能不是java,由服务器自行决定
            	Class<?> target = Class.forName(this.getAttachment(Constants.PATH_KEY));
        		Method m = ReflectUtils.findMethodByMethodName(target, this.getMethodName());
        		if(m == null) throw new  NoSuchMethodException(this.getMethodName());
       			pts = m.getParameterTypes();
       			paramCount = pts.length;
        		 args = new Object[pts.length];
                 for (int i = 0; i < args.length; i++) {
                     try {
                         args[i] = in.readObject(pts[i]);
                     } catch (Exception e) {
                         if (log.isWarnEnabled()) {
                             log.warn("Decode argument failed: " + e.getMessage(), e);
                         }
                     }
                 }
            } else{
            	Class<?> target = Class.forName(this.getAttachment(Constants.PATH_KEY));
            	Method m = ReflectUtils.findMethodByMethodName(target, this.getMethodName(), paramCount);
            	if(m == null) throw new  NoSuchMethodException(this.getMethodName());
            	pts = m.getParameterTypes();
                args = new Object[paramCount];
                for (int i = 0; i < args.length; i++) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            
            ReflectUtils.makeCompatible(pts, args);
            setParameterTypes(pts);
            //翻译attachments
            Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = getAttachments();
                if (attachment == null) {
                    attachment = new HashMap<String, String>();
                }
                attachment.putAll(map);
                setAttachments(attachment);
            }

            
            
            //decode argument ,may be callback
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            setArguments(args);

        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } catch (NoSuchMethodException e) {
        	throw new IOException(StringUtils.toString("Read invocation data failed.", e));
		}
        return this;
    }

	@Override
	public Map<String, String> headers() {
		return this.getHeaders();
	}

}
